从Flink LookupJoin 入门 Flink Rule

您所在的位置:网站首页 with lookup 从Flink LookupJoin 入门 Flink Rule

从Flink LookupJoin 入门 Flink Rule

2023-11-27 04:09| 来源: 网络整理| 查看: 265

本文参考 Flink 1.14.0 版本

前置知识

如果只想了解 Flink 几种 Join 可以跳过。要是需要上手开发,前置知识必须先掌握。 Calcite、Flink SQL 相关知识,网上已有很多关于这方面的文章,可参考: flink sql 知其所以然(六)| flink sql 约会 calcite(看这篇就够了) - 掘金 [源码分析] 带你梳理 Flink SQL / Table API内部执行流程

Flink Join 常规Join

例如常用的内联接:

SELECT * FROM Orders JOIN Product ON Orders.productId = Product.id

这种 JOIN 要求 JOIN 两边数据都永久保留在 Flink state 中,才能保证输出结果的准确性,这将导致 State 的无限膨胀。 可以配置 state 的TTL(time-to-live:table.exec.state.ttl)来避免其无限增长,但请注意这可能会影响查询结果的准备性。

Interval Join

根据 JOIN 条件和时间限制进行的 JOIN。它基于两个 KeyStream,按照 JOIN 条件将一条流上的每条数据与另一条流上不同时间窗口的数据进行连接。 例如,查询订单及关联的支付信息,其中支付是在下单时间前后各1小时内:

SELECT ... FROM Orders AS o JOIN Payment AS p ON o.orderId = p.orderId AND p.payTime BETWEEN orderTime - INTERVAL '1' HOUR AND orderTime + INTERVAL '1' HOUR Temporal join

首先介绍一个时态表的概念,这是一个随时间不断变化的动态表,它可能包含表的多个快照。 对于时态表中的记录,可以追踪、访问其历史版本的表称为版本表,如数据库的 changeLog; 只能追踪、访问最新版本的表称为普通表,如数据库的表。

在Flink中,定义了主键约束和事件时间属性的表就是版本表。

Temporal Join 允许 JOIN 版本表,即主表可以用一个不断更新的版本表,根据时间和等值关联条件来扩充其详细信息。两个表必须同时为事件时间或处理时间。

当使用事件时间时,版本表保留从上一个 watermark 到当前时刻的所有版本数据,左右表都需要配置好 watermark;右表必须为 CDC 数据,正确配置主键,且主键必须在 JOIN 的等值关联条件中。例如: -- 左表为普通的 append-only 表. CREATE TABLE orders ( order_id STRING, price DECIMAL(32,2), currency STRING, order_time TIMESTAMP(3), WATERMARK FOR order_time AS order_time ) WITH (/* ... */); -- 右表为汇率的版本表,CDC 数据 CREATE TABLE currency_rates ( currency STRING, conversion_rate DECIMAL(32, 2), update_time TIMESTAMP(3) METADATA FROM `values.source.timestamp` VIRTUAL, WATERMARK FOR update_time AS update_time, PRIMARY KEY(currency) NOT ENFORCED ) WITH ( 'connector' = 'kafka', 'value.format' = 'debezium-json', /* ... */ ); SELECT order_id, price, currency, conversion_rate, order_time FROM orders LEFT JOIN currency_rates FOR SYSTEM_TIME AS OF orders.order_time -- 主键必须在关联条件中 ON orders.currency = currency_rates.currency; order_id price currency conversion_rate order_time ======== ===== ======== =============== ========= o_001 11.11 EUR 1.14 12:00:00 o_002 12.51 EUR 1.10 12:06:00 当使用处理时间时,用户可以将 Lookup 表(右表)看成一个普通的HashMap,它存储了最新的全量数据。Flink 可直接 JOIN 一个外部数据库系统的表,而无须存储最新版本的状态。例如: SELECT o.amout, o.currency, r.rate, o.amount * r.rate FROM Orders AS o JOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS r ON r.currency = o.currency; -- 或 Join 一个表函数 SELECT o_amount, r_rate FROM Orders, LATERAL TABLE (Rates(o_proctime)) WHERE r_currency = o_currency

注意:"FOR SYSTEM_TIME AS OF"语法不支持 VIEW/任意最新表是因为考虑到Flink的实现与其语义不大相符,左流的 JOIN 处理不会等待右边的版本表(VIEW/表函数)完成快照后才进行。个人理解可能会导致左表 JOIN 上的右表并不一定是当前最新的数据。

Lookup Join

同基于事件时间的 Temporal Join,以 JOIN 算子执行时的时间点查询右表的数据进行关联。一般用于维表关联,只支持等值 JOIN。例如:

SELECT o.amout, o.currency, r.rate, o.amount * r.rate FROM Orders AS o JOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS r ON r.currency = o.currency; Lookup Join 执行流程

本文以 Flink 单测用例为例进行讲解,新手可以基于此上手开发自定义的 Rule。

准备工作 编译 Flink Table 模块

flink-table 目录下执行:mvn clean package -Pfast,hive-2.1.1,scala-2.12 -DskipTests

打开单测文件

Flink Rule 的 UT 包含:

逻辑计划测试:flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/logical物理计划测试:flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql、XXX/batch/sql集成测试:flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql、XXX/batch/sql

这也是向社区提交 Rule 相关 PR 需要完成的 UT

打开日志级别

在需要单测的代码前,加上:Configurator.setAllLevels("", Level.TRACE)

跟踪sql执行 下文基于文件:flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.scala 的执行进行分析。执行单测:testJoinTemporalTable SELECT * FROM MyTable AS T JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id sql解析

parser(calcite语法支持)会将SQL语句 "FOR SYSTEM_TIME AS OF " 解析成 SqlSnapshot ( SqlNode),validate() 将其转换成 LogicalSnapshot(RelNode),可以看到逻辑执行计划:

LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], id=[$5], name=[$6], age=[$7]) LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 3}]) LogicalTableScan(table=[[default_catalog, default_database, MyTable]]) LogicalFilter(condition=[=($cor0.a, $0)]) LogicalSnapshot(period=[$cor0.proctime]) LogicalTableScan(table=[[default_catalog, default_database, LookupTable]]) 优化器优化

FlinkStreamProgram/FlinkBatchProgram中定义了一系列规则,对逻辑/物理计划进行转换和优化。 该案例中会经历下边的几个重要的转换过程:

LogicalCorrelateToJoinFromLookupTableRuleWithFilter: // 从类的定义可以看出,上方的逻辑计划能匹配上该规则 class LogicalCorrelateToJoinFromLookupTableRuleWithFilter extends LogicalCorrelateToJoinFromLookupTemporalTableRule( operand(classOf[LogicalCorrelate], operand(classOf[RelNode], any()), operand(classOf[LogicalFilter], operand(classOf[LogicalSnapshot], operand(classOf[RelNode], any())))), "LogicalCorrelateToJoinFromLookupTableRuleWithFilter" ) { override def matches(call: RelOptRuleCall): Boolean = { val snapshot: LogicalSnapshot = call.rel(3) val snapshotInput: RelNode = trimHep(call.rel(4)) isLookupJoin(snapshot, snapshotInput) } …… } // 匹配到规则后判断是否为 lookupJoin protected def isLookupJoin(snapshot: LogicalSnapshot, snapshotInput: RelNode): Boolean = { …… // 是处理时间 且 快照的表为LookupTableSource isProcessingTime && snapshotOnLookupSource }

匹配到后,会将LogicalCorrelate转换成LogicalJoin

LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], id=[$5], name=[$6], age=[$7]) +- LogicalJoin(condition=[=($0, $5)], joinType=[inner]) :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]]) +- LogicalSnapshot(period=[$cor0.proctime]) +- LogicalTableScan(table=[[default_catalog, default_database, LookupTable]]) FlinkProjectJoinTransposeRule + ProjectRemoveRule:Project算子下推并裁剪 // 对调Project和下方的Join算子,实现下推Project public FlinkProjectJoinTransposeRule( PushProjector.ExprCondition preserveExprCondition, RelBuilderFactory relFactory) { super(operand(Project.class, operand(Join.class, any())), relFactory, null); this.preserveExprCondition = preserveExprCondition; }

优化后:

LogicalJoin(condition=[=($0, $5)], joinType=[inner]) :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]]) +- LogicalSnapshot(period=[$cor0.proctime]) +- LogicalTableScan(table=[[default_catalog, default_database, LookupTable]])

接下来的Volcano规则会对逻辑计划进行组合优化,生成最优的计划。可以看到执行后,最优结果为:

12129 [main] DEBUG org.apache.calcite.plan.RelOptPlanner [] - Cheapest plan: FlinkLogicalJoin(condition=[=($0, $5)], joinType=[inner]): rowcount = 3.0E7, cumulative cost = {4.0E8 rows, 5.0E8 cpu, 1.37E10 io, 0.0 network, 0.0 memory}, id = 403 FlinkLogicalDataStreamTableScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime]): rowcount = 1.0E8, cumulative cost = {1.0E8 rows, 1.0E8 cpu, 4.8E9 io, 0.0 network, 0.0 memory}, id = 378 FlinkLogicalSnapshot(period=[$cor0.proctime]): rowcount = 1.0E8, cumulative cost = {2.0E8 rows, 2.0E8 cpu, 4.0E9 io, 0.0 network, 0.0 memory}, id = 402 FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, LookupTable]], fields=[id, name, age]): rowcount = 1.0E8, cumulative cost = {1.0E8 rows, 1.0E8 cpu, 2.0E9 io, 0.0 network, 0.0 memory}, id = 381 // 最后结果: FlinkLogicalJoin(condition=[=($0, $5)], joinType=[inner]) :- FlinkLogicalDataStreamTableScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime]) +- FlinkLogicalSnapshot(period=[$cor0.proctime]) +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, LookupTable]], fields=[id, name, age])

尝试规则

Rules Attempts Time (us) FlinkJoinPushExpressionsRule 2 553 JoinConditionPushRule 2 152 FlinkLogicalTableSourceScanConverter(in:NONE,out:LOGICAL) 1 54,956 FlinkLogicalJoinConverter(in:NONE,out:LOGICAL) 1 4,787 FlinkLogicalSnapshotConverter(in:NONE,out:LOGICAL) 1 3,162 FlinkLogicalDataStreamTableScanConverter(in:NONE,out:LOGICAL) 1 1,403 SimplifyJoinConditionRule 1 249 * Total 9 65,262

其中:几个Converter放在LOGICAL_CONVERTERS中,该集合包含了一系列将 Calcite node 转换成 Flink node 的逻辑规则。

比如:FlinkLogicalSnapshotConverter: // 把 LogicalSnapshot 转换成 FlinkLogicalSnapshot class FlinkLogicalSnapshotConverter extends ConverterRule( // 匹配 LogicalSnapshot 类型,且没有Convention,输出的为 FlinkConventions.LOGICAL classOf[LogicalSnapshot], Convention.NONE, FlinkConventions.LOGICAL, "FlinkLogicalSnapshotConverter") { def convert(rel: RelNode): RelNode = { val snapshot = rel.asInstanceOf[LogicalSnapshot] val newInput = RelOptRule.convert(snapshot.getInput, FlinkConventions.LOGICAL) FlinkLogicalSnapshot.create(newInput, snapshot.getPeriod) } } 增加处理时间实体化的算子 // convert time indicators chainedProgram.addLast(TIME_INDICATOR, new FlinkRelTimeIndicatorProgram) // 如果是事件时间,且必要的情况下,这里会创建一个 sqlFunction 来实现 rexBuilder.makeCall(FlinkSqlOperatorTable.PROCTIME_MATERIALIZE, expr)

经转换:

FlinkLogicalCalc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, id, name, age]) +- FlinkLogicalJoin(condition=[=($0, $5)], joinType=[inner]) :- FlinkLogicalDataStreamTableScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime]) +- FlinkLogicalSnapshot(period=[$cor0.proctime]) +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, LookupTable]], fields=[id, name, age]) 物理规则优化

经下述物理Volcano规则处理后

FlinkJoinPushExpressionsRule JoinConditionPushRule StreamPhysicalTableSourceScanRule(in:LOGICAL,out:STREAM_PHYSICAL) FlinkLogicalTableSourceScanConverter(in:NONE,out:LOGICAL) StreamPhysicalSnapshotOnTableScanRule StreamPhysicalCalcRule(in:LOGICAL,out:STREAM_PHYSICAL) FlinkLogicalJoinConverter(in:NONE,out:LOGICAL) StreamPhysicalDataStreamScanRule(in:LOGICAL,out:STREAM_PHYSICAL) FlinkLogicalSnapshotConverter(in:NONE,out:LOGICAL) FlinkLogicalDataStreamTableScanConverter(in:NONE,out:LOGICAL) SimplifyJoinConditionRule

得到最优结果:

Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, id, name, age]) +- LookupJoin(table=[default_catalog.default_database.LookupTable], joinType=[InnerJoin], async=[false], lookup=[id=a], select=[a, b, c, proctime, rowtime, id, name, age]) +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime]) StreamPhysicalCalcRule:将FlinkLogicalCalc转换成StreamPhysicalCalcSnapshotOnTableScanRule:将 FlinkLogicalJoin +- FlinkLogicalDataStreamTableScan +- FlinkLogicalSnapshot +- FlinkLogicalTableSourceScan

转换成

StreamPhysicalLookupJoin +- StreamPhysicalDataStreamScan

这里是LookupJoin的关键转换逻辑:

// 该规则使用父类的匹配条件 class SnapshotOnTableScanRule extends BaseSnapshotOnTableScanRule("StreamPhysicalSnapshotOnTableScanRule") { } // 可以看到,正好匹配上未优化前的逻辑计划 abstract class BaseSnapshotOnTableScanRule(description: String) extends RelOptRule( operand(classOf[FlinkLogicalJoin], operand(classOf[FlinkLogicalRel], any()), operand(classOf[FlinkLogicalSnapshot], operand(classOf[TableScan], any()))), description) with CommonLookupJoinRule private def doTransform( join: FlinkLogicalJoin, input: FlinkLogicalRel, temporalTable: RelOptTable, calcProgram: Option[RexProgram]): StreamPhysicalLookupJoin = { val joinInfo = join.analyzeCondition val cluster = join.getCluster val providedTrait = join.getTraitSet.replace(FlinkConventions.STREAM_PHYSICAL) val requiredTrait = input.getTraitSet.replace(FlinkConventions.STREAM_PHYSICAL) //将input从逻辑节点转换成物理节点,这里会触发 StreamPhysicalDataStreamScanRule, //把FlinkLogicalTableSourceScan转换成StreamPhysicalDataStreamScan val convInput = RelOptRule.convert(input, requiredTrait) new StreamPhysicalLookupJoin( cluster, providedTrait, convInput, temporalTable, calcProgram, joinInfo, join.getJoinType) }

至此完成物理计划的转换

翻译物理计划

planner.translate() 其中包括了:

val execGraph = translateToExecNodeGraph(optimizedRelNodes) val transformations = translateToPlan(execGraph)

在translateToExecNodeGraph中:会调用物理计划生成最后节点的translateToExecNode方法。如

StreamPhysicalLookupJoin会转换成StreamExecLookupJoin 在translateToPlan中:调用ExecNode的translateToPlanInternal方法。以CommonExecLookupJoin为例: protected CommonExecLookupJoin(……){ //这里忽略校验和异步LookupFunction逻辑 public Transformation translateToPlanInternal(PlannerBase planner) { // -----------创建lookupFunction Operator的工厂--------------- RelOptTable temporalTable = temporalTableSourceSpec.getTemporalTable(planner); UserDefinedFunction userDefinedFunction = LookupJoinUtil.getLookupFunction(temporalTable, lookupKeys.keySet()); UserDefinedFunctionHelper.prepareInstance( planner.getTableConfig().getConfiguration(), userDefinedFunction); boolean isLeftOuterJoin = joinType == FlinkJoinType.LEFT; StreamOperatorFactory operatorFactory; operatorFactory = createSyncLookupJoin( temporalTable, planner.getTableConfig(), lookupKeys, (TableFunction) userDefinedFunction, planner.getRelBuilder(), inputRowType, tableSourceRowType, resultRowType, isLeftOuterJoin, planner.getExecEnv().getConfig().isObjectReuseEnabled()); //------------------------------------------------------- // 转换成Transformation Transformation inputTransformation = (Transformation) inputEdge.translateToPlan(planner); return new OneInputTransformation( inputTransformation, getDescription(), operatorFactory, InternalTypeInfo.of(resultRowType), inputTransformation.getParallelism()); } } //只罗列核心逻辑,主要分三块 private StreamOperatorFactory createSyncLookupJoin() { // 通过codeGenerator,生成lookupFunction的函数,包装成FlatMap函数 GeneratedFunction generatedFetcher = LookupJoinCodeGenerator.generateSyncLookupFunction(); // 生成表函数的输出结果的Collector GeneratedCollector generatedCollector = LookupJoinCodeGenerator.generateCollector(); // 最后会生成LookupJoinRunner的ProcessFunction // 如果在lookupJoin这一侧(即右表)有Calc的话,该Runner中会带有Calc的计算逻辑 // 比如:SELECT * FROM T JOIN DIM FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.b + 1 // Fetcher会读出LookupFunction中的原始数据,再经过calc计算后,再与主表(左流)的数据进行比对 GeneratedFunction generatedCalc = LookupJoinCodeGenerator.generateCalcMapFunction( config, JavaScalaConversionUtil.toScala(projectionOnTemporalTable), filterOnTemporalTable, temporalTableOutputType, tableSourceRowType); ProcessFunction processFunc = new LookupJoinWithCalcRunner( generatedFetcher, generatedCalc, generatedCollector, isLeftOuterJoin, rightRowType.getFieldCount()); }

最后再Transformations->StreamGraph->JobGraph,与DataStream API的流程就统一了。



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3